package com.zzz.HbaseMr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

/**
 * 需求：读取HBase当中一张表的数据，然后将数据写入到HBase当中的另外一张表当中去。
 *
 * 注意：我们可以使用TableMapper与TableReducer来实现从HBase当中读取与写入数据
 *
 * 这里我们将myuser这张表当中f1列族的name和age字段写入到myuser2这张表的f1列族当中去
 */
public class HBaseMR {

    public static class HBaseMapper extends TableMapper<Text, Put> {
        /**
         * @param key  我们的主键rowkey
         * @param value  我们一行数据所有列的值都封装在value里面了
         */
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            //获取rowkey的字节数组,一个rowkey即可以构成字节数组
            byte[] bytes = key.get();
            String rowkey = Bytes.toString(bytes);

            Put put = new Put(bytes);

            //获取一行中所有的cell对象
            Cell[] cells = value.rawCells();
            for (Cell cell : cells) {
                // f1列族
                if("f1".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                    // name列名
                    if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                    // age列名
                    if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell);
                    }
                }
            }
            if (!put.isEmpty()){
                context.write(new Text(rowkey),put);
            }
        }
    }

    public static class HbaseReducer extends TableReducer<Text, Put,ImmutableBytesWritable>{
        @Override
        protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
            for (Put value : values) {
                context.write(null,value);
            }
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        configuration.set("hbase.zookeeper.quorum","node01,node02,node03");
        Scan scan = new Scan();
        Job job = Job.getInstance(configuration, "hbaseMr");
        job.setJarByClass(HBaseMR.class);

        //使用TableMapReduceUtil 工具类来初始化我们的mapper
        TableMapReduceUtil.initTableMapperJob("myuser",scan,HBaseMapper.class,Text.class,Put.class,job);

        //使用TableMapReduceUtil 工具类来初始化我们的reducer
        TableMapReduceUtil.initTableReducerJob("myuser2",HbaseReducer.class,job);

        job.setNumReduceTasks(1);
        boolean b = job.waitForCompletion(true);

        System.exit(b?0:1);
    }
}
